You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2010/08/31 21:12:15 UTC

svn commit: r991295 - in /incubator/lcf/trunk/modules: connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/ framework/agents/src/main/java/org/apache/acf/agents/interfaces/ framework/agents/src/main/java/org/apache/acf/agents/outp...

Author: kwright
Date: Tue Aug 31 19:12:14 2010
New Revision: 991295

URL: http://svn.apache.org/viewvc?rev=991295&view=rev
Log:
Add support for job end notification to output connectors, with guaranteed retry until the notification succeeds.  Fix for CONNECTORS-41.

Added:
    incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java   (with props)
Modified:
    incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/HttpPoster.java
    incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/SolrConnector.java
    incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/interfaces/IOutputConnector.java
    incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/output/BaseOutputConnector.java
    incubator/lcf/trunk/modules/framework/crawler-ui/src/main/webapp/showjobstatus.jsp
    incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/ListJobStatuses.java
    incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/IJobManager.java
    incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/JobStatus.java
    incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/JobManager.java
    incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/Jobs.java
    incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/ACF.java
    incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/FinisherThread.java
    incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobResetThread.java

Modified: incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/HttpPoster.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/HttpPoster.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/HttpPoster.java (original)
+++ incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/HttpPoster.java Tue Aug 31 19:12:14 2010
@@ -176,6 +176,77 @@ public class HttpPoster
       interruptionRetryTime = new Long(x).longValue();
   }
 
+  /** Cause a commit to happen.
+  */
+  public void commitPost()
+    throws ACFException, ServiceInterruption
+  {
+    if (Logging.ingest.isDebugEnabled())
+      Logging.ingest.debug("commitPost()");
+
+    int ioErrorRetry = 5;
+    while (true)
+    {
+      // Open a socket to ingest, and to the response stream to get the post result
+      try
+      {
+        CommitThread t = new CommitThread();
+        try
+        {
+          t.start();
+          t.join();
+
+          Throwable thr = t.getException();
+          if (thr != null)
+          {
+            if (thr instanceof ServiceInterruption)
+              throw (ServiceInterruption)thr;
+            if (thr instanceof ACFException)
+              throw (ACFException)thr;
+            if (thr instanceof IOException)
+              throw (IOException)thr;
+            if (thr instanceof RuntimeException)
+              throw (RuntimeException)thr;
+            else
+              throw (Error)thr;
+          }
+          return;
+        }
+        catch (InterruptedException e)
+        {
+          t.interrupt();
+          throw new ACFException("Interrupted: "+e.getMessage(),ACFException.INTERRUPTED);
+        }
+      }
+      catch (IOException ioe)
+      {
+        if (ioErrorRetry == 0)
+        {
+          long currentTime = System.currentTimeMillis();
+          throw new ServiceInterruption("IO exception committing: "+ioe.getMessage(),
+            ioe,
+            currentTime + interruptionRetryTime,
+            currentTime + 2L * 60L * 60000L,
+            -1,
+            true);
+        }
+      }
+
+      // Go back around again!
+      // Sleep for a time, and retry
+      try
+      {
+        ACF.sleep(10000L);
+      }
+      catch (InterruptedException e)
+      {
+        throw new ACFException("Interrupted",ACFException.INTERRUPTED);
+      }
+      ioErrorRetry--;
+
+    }
+
+  }
   
   /**
   * Post the input stream to ingest
@@ -1524,6 +1595,130 @@ public class HttpPoster
       return activityDetails;
     }
   }
+  
+  /** Killable thread that does a commit.
+  * Java 1.5 stopped permitting thread interruptions to abort socket waits.  As a result, it is impossible to get threads to shutdown cleanly that are doing
+  * such waits.  So, the places where this happens are segregated in their own threads so that they can be just abandoned.
+  *
+  * This thread does a commit.
+  */
+  protected class CommitThread extends java.lang.Thread
+  {
+    protected Throwable exception = null;
+
+    public CommitThread()
+    {
+      super();
+      setDaemon(true);
+    }
+
+    public void run()
+    {
+      try
+      {
+        // Do the operation!
+        // Open a socket to update request handler, and to the response stream to get the post result
+        try
+        {
+          // Set up the socket, and the (optional) secure socket.
+          Socket socket = createSocket(responseRetries);
+          try
+          {
+            InputStream in = socket.getInputStream();
+            try
+            {
+              OutputStream out = socket.getOutputStream();
+              try
+              {
+                // Create the output stream to GTS
+                byte[] tmp = ("GET " + postUpdateAction + "?commit=true HTTP/1.0\r\n").getBytes("ASCII");
+                out.write(tmp, 0, tmp.length);
+
+                writeCredentials(out);
+
+                tmp = ("Content-Length: 0\r\n\r\n").getBytes("ASCII");
+                out.write(tmp, 0, tmp.length);
+
+                if (Logging.ingest.isDebugEnabled())
+                  Logging.ingest.debug("Commit request posted");
+
+                out.flush();
+
+                CodeDetails cd = getResponse(in);
+
+                int codeValue = cd.getCodeValue();
+                if (codeValue < 0)
+                  throw new ACFException("Http protocol error");
+
+                // 200 means everything went OK
+                if (codeValue == 200)
+                {
+                  cd.parseCommitResponse();
+                  return;
+                }
+
+                // We ignore everything in the range from 400-500 now
+                if (codeValue == 401)
+                  throw new ACFException("Bad credentials for commit request",ACFException.SETUP_ERROR);
+
+                // Anything else means the info request failed.
+                throw new ACFException("Error connecting to update request API: '"+cd.getDescription()+"'");
+              }
+              finally
+              {
+                out.close();
+              }
+            }
+            finally
+            {
+              in.close();
+            }
+          }
+          finally
+          {
+            try
+            {
+              socket.close();
+            }
+            catch (InterruptedIOException e)
+            {
+              throw e;
+            }
+            catch (IOException e)
+            {
+              Logging.ingest.debug("Error closing socket: "+e.getMessage(),e);
+              // Do NOT rethrow
+            }
+          }
+        }
+        catch (UnsupportedEncodingException ioe)
+        {
+          throw new ACFException("Fatal commit error: "+ioe.getMessage(),ioe);
+        }
+        catch (InterruptedIOException ioe)
+        {
+          // Exit the thread.
+          return;
+        }
+        catch (IOException ioe)
+        {
+          // Log the error
+          Logging.ingest.warn("Error communicating with update request handler: "+ioe.getMessage(),ioe);
+          throw ioe;
+        }
+      }
+      catch (Throwable e)
+      {
+        this.exception = e;
+      }
+    }
+
+    public Throwable getException()
+    {
+      return exception;
+    }
+  }
+
 
   /** Killable thread that does a status check.
   * Java 1.5 stopped permitting thread interruptions to abort socket waits.  As a result, it is impossible to get threads to shutdown cleanly that are doing
@@ -1775,6 +1970,12 @@ public class HttpPoster
       parseIngestionResponse();
     }
 
+    public void parseCommitResponse()
+      throws ACFException
+    {
+      parseIngestionResponse();
+    }
+    
     public void parseStatusResponse()
       throws ACFException
     {

Modified: incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/SolrConnector.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/SolrConnector.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/SolrConnector.java (original)
+++ incubator/lcf/trunk/modules/connectors/solr/connector/src/main/java/org/apache/acf/agents/output/solr/SolrConnector.java Tue Aug 31 19:12:14 2010
@@ -358,6 +358,20 @@ public class SolrConnector extends org.a
     poster.deletePost(documentURI,activities);
   }
 
+  /** Notify the connector of a completed job.
+  * This is meant to allow the connector to flush any internal data structures it has been keeping around, or to tell the output repository that this
+  * is a good time to synchronize things.  It is called whenever a job is either completed or aborted.
+  */
+  public void noteJobComplete()
+    throws ACFException, ServiceInterruption
+  {
+    // Establish a session
+    getSession();
+    
+    // Do a commit post
+    poster.commitPost();
+  }
+
   // UI support methods.
   //
   // These support methods come in two varieties.  The first bunch is involved in setting up connection configuration information.  The second bunch

Modified: incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/interfaces/IOutputConnector.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/interfaces/IOutputConnector.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/interfaces/IOutputConnector.java (original)
+++ incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/interfaces/IOutputConnector.java Tue Aug 31 19:12:14 2010
@@ -132,6 +132,13 @@ public interface IOutputConnector extend
   public void removeDocument(String documentURI, String outputDescription, IOutputRemoveActivity activities)
     throws ACFException, ServiceInterruption;
 
+  /** Notify the connector of a completed job.
+  * This is meant to allow the connector to flush any internal data structures it has been keeping around, or to tell the output repository that this
+  * is a good time to synchronize things.  It is called whenever a job is either completed or aborted.
+  */
+  public void noteJobComplete()
+    throws ACFException, ServiceInterruption;
+
   // UI support methods.
   //
   // These support methods come in two varieties.  The first bunch (inherited from IConnector) is involved in setting up connection configuration information.

Modified: incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/output/BaseOutputConnector.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/output/BaseOutputConnector.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/output/BaseOutputConnector.java (original)
+++ incubator/lcf/trunk/modules/framework/agents/src/main/java/org/apache/acf/agents/output/BaseOutputConnector.java Tue Aug 31 19:12:14 2010
@@ -69,6 +69,16 @@ public abstract class BaseOutputConnecto
     throw new ACFException("Unrecognized output connector command '"+command+"'");
   }
 
+  /** Notify the connector of a completed job.
+  * This is meant to allow the connector to flush any internal data structures it has been keeping around, or to tell the output repository that this
+  * is a good time to synchronize things.  It is called whenever a job is either completed or aborted.
+  */
+  public void noteJobComplete()
+    throws ACFException, ServiceInterruption
+  {
+    // The base implementation does nothing here.
+  }
+
   /** Detect if a mime type is indexable or not.  This method is used by participating repository connectors to pre-filter the number of
   * unusable documents that will be passed to this output connector.
   *@param mimeType is the mime type of the document.

Modified: incubator/lcf/trunk/modules/framework/crawler-ui/src/main/webapp/showjobstatus.jsp
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/crawler-ui/src/main/webapp/showjobstatus.jsp?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/crawler-ui/src/main/webapp/showjobstatus.jsp (original)
+++ incubator/lcf/trunk/modules/framework/crawler-ui/src/main/webapp/showjobstatus.jsp Tue Aug 31 19:12:14 2010
@@ -149,6 +149,9 @@ if (maintenanceUnderway == false)
 		case JobStatus.JOBSTATUS_JOBENDCLEANUP:
 			statusName = "Terminating";
 			break;
+		case JobStatus.JOBSTATUS_JOBENDNOTIFICATION:
+			statusName = "End notification";
+			break;
 		case JobStatus.JOBSTATUS_ERROR:
 			statusName = "Error: "+js.getErrorText();
 			break;

Modified: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/ListJobStatuses.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/ListJobStatuses.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/ListJobStatuses.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/ListJobStatuses.java Tue Aug 31 19:12:14 2010
@@ -106,6 +106,8 @@ public class ListJobStatuses
       return "running no connector";
     case JobStatus.JOBSTATUS_JOBENDCLEANUP:
       return "terminating";
+    case JobStatus.JOBSTATUS_JOBENDNOTIFICATION:
+      return "notifying";
     default:
       return "unknown";
     }

Modified: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/IJobManager.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/IJobManager.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/IJobManager.java Tue Aug 31 19:12:14 2010
@@ -621,7 +621,18 @@ public interface IJobManager
   public JobStartRecord[] getJobsReadyForStartup()
     throws ACFException;
 
+  /** Find the list of jobs that need to have their connectors notified of job completion.
+  *@return the ID's of jobs that need their output connectors notified in order to become inactive.
+  */
+  public Long[] getJobsReadyForInactivity()
+    throws ACFException;
 
+  /** Inactivate a job, from the notification state.
+  *@param jobID is the ID of the job to inactivate.
+  */
+  public void inactivateJob(Long jobID)
+    throws ACFException;
+    
   /** Reset a starting job back to "ready for startup" state.
   *@param jobID is the job id.
   */
@@ -740,16 +751,16 @@ public interface IJobManager
     throws ACFException;
 
   /** Put all eligible jobs in the "shutting down" state.
-  *@param finishList is filled in with the set of IJobDescription objects that were completed.
   */
-  public void finishJobs(ArrayList finishList)
+  public void finishJobs()
     throws ACFException;
 
   /** Reset eligible jobs back to "inactive" state.  This method is used to pick up all jobs in the shutting down state
   * whose purgatory records have been all cleaned up.
   *@param currentTime is the current time in milliseconds since epoch.
+  *@param resetJobs is filled in with the set of IJobDescription objects that were reset.
   */
-  public void resetJobs(long currentTime)
+  public void resetJobs(long currentTime, ArrayList resetJobs)
     throws ACFException;
 
 

Modified: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/JobStatus.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/JobStatus.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/JobStatus.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/interfaces/JobStatus.java Tue Aug 31 19:12:14 2010
@@ -38,6 +38,7 @@ public class JobStatus
   public final static int JOBSTATUS_RESTARTING = 9;
   public final static int JOBSTATUS_RUNNING_UNINSTALLED = 10;
   public final static int JOBSTATUS_JOBENDCLEANUP = 11;
+  public final static int JOBSTATUS_JOBENDNOTIFICATION = 12;
 
 
   // Member variables.

Modified: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/JobManager.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/JobManager.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/JobManager.java Tue Aug 31 19:12:14 2010
@@ -4807,6 +4807,65 @@ public class JobManager implements IJobM
     }
   }
 
+  /** Inactivate a job, from the notification state.
+  *@param jobID is the ID of the job to inactivate.
+  */
+  public void inactivateJob(Long jobID)
+    throws ACFException
+  {
+    // While there is no flow that can cause a job to be in the wrong state when this gets called, as a precaution
+    // it might be a good idea to put this in a transaction and have the state get checked first.
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
+      {
+        // Check job status
+        ArrayList list = new ArrayList();
+        list.add(jobID);
+        IResultSet set = database.performQuery("SELECT "+jobs.statusField+" FROM "+jobs.getTableName()+
+          " WHERE "+jobs.idField+"=? FOR UPDATE",list,null,null);
+        if (set.getRowCount() == 0)
+          throw new ACFException("No such job: "+jobID);
+        IResultRow row = set.getRow(0);
+        int status = jobs.stringToStatus((String)row.getValue(jobs.statusField));
+
+        switch (status)
+        {
+        case Jobs.STATUS_NOTIFYINGOFCOMPLETION:
+          jobs.notificationComplete(jobID);
+          break;
+        default:
+          throw new ACFException("Unexpected job status: "+Integer.toString(status));
+        }
+        return;
+      }
+      catch (ACFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted clearing notification state for job: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+  }
+
   /** Reset a starting job back to "ready for startup" state.
   *@param jobID is the job id.
   */
@@ -5149,9 +5208,8 @@ public class JobManager implements IJobM
   }
 
   /** Put all eligible jobs in the "shutting down" state.
-  *@param finishList is filled in with the set of IJobDescription objects that were completed.
   */
-  public void finishJobs(ArrayList finishList)
+  public void finishJobs()
     throws ACFException
   {
     while (true)
@@ -5216,10 +5274,7 @@ public class JobManager implements IJobM
           if (confirmSet.getRowCount() > 0)
             continue;
 
-          IJobDescription jobDescription = jobs.load(jobID,true);
-
           // Mark status of job as "finishing"
-          finishList.add(jobDescription);
           jobs.writeStatus(jobID,jobs.STATUS_SHUTTINGDOWN);
           if (Logging.jobs.isDebugEnabled())
           {
@@ -5254,6 +5309,32 @@ public class JobManager implements IJobM
     }
   }
 
+  /** Find the list of jobs that need to have their connectors notified of job completion.
+  *@return the ID's of jobs that need their output connectors notified in order to become inactive.
+  */
+  public Long[] getJobsReadyForInactivity()
+    throws ACFException
+  {
+    // Do the query
+    IResultSet set = database.performQuery("SELECT "+jobs.idField+" FROM "+
+      jobs.getTableName()+" WHERE "+jobs.statusField+"="+
+      database.quoteSQLString(jobs.statusToString(jobs.STATUS_NOTIFYINGOFCOMPLETION)),null,null,null);
+    // Return them all
+    Long[] rval = new Long[set.getRowCount()];
+    int i = 0;
+    while (i < rval.length)
+    {
+      IResultRow row = set.getRow(i);
+      Long jobID = (Long)row.getValue(jobs.idField);
+      rval[i++] = jobID;
+      if (Logging.jobs.isDebugEnabled())
+      {
+        Logging.jobs.debug("Found job "+jobID+" in need of notification");
+      }
+    }
+    return rval;
+  }
+  
   /** Complete the sequence that aborts jobs and makes them runnable again.
   *@param timestamp is the current time.
   *@param abortJobs is the set of IJobDescription objects that were aborted (and stopped).
@@ -5359,8 +5440,9 @@ public class JobManager implements IJobM
   /** Reset eligible jobs back to "inactive" state.  This method is used to pick up all jobs in the shutting down state
   * whose purgatory or being-deleted records have been all cleaned up.
   *@param currentTime is the current time in milliseconds since epoch.
+  *@param resetJobs is filled in with the set of IJobDescription objects that were reset.
   */
-  public void resetJobs(long currentTime)
+  public void resetJobs(long currentTime, ArrayList resetJobs)
     throws ACFException
   {
     while (true)
@@ -5403,7 +5485,9 @@ public class JobManager implements IJobM
           if (confirmSet.getRowCount() > 0)
             continue;
 
-
+          IJobDescription jobDesc = jobs.load(jobID,true);
+          resetJobs.add(jobDesc);
+          
           // Label the job "finished"
           jobs.finishJob(jobID,currentTime);
           if (Logging.jobs.isDebugEnabled())
@@ -5630,6 +5714,9 @@ public class JobManager implements IJobM
       case Jobs.STATUS_SHUTTINGDOWN:
         rstatus = JobStatus.JOBSTATUS_JOBENDCLEANUP;
         break;
+      case Jobs.STATUS_NOTIFYINGOFCOMPLETION:
+        rstatus = JobStatus.JOBSTATUS_JOBENDNOTIFICATION;
+        break;
       case Jobs.STATUS_ABORTING:
       case Jobs.STATUS_ABORTINGSEEDING:
       case Jobs.STATUS_ABORTINGSTARTINGUP:

Modified: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/Jobs.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/Jobs.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/jobs/Jobs.java Tue Aug 31 19:12:14 2010
@@ -33,24 +33,25 @@ public class Jobs extends org.apache.acf
   // Status field values
   public static final int STATUS_INACTIVE = 0;                            // Not running
   public static final int STATUS_ACTIVE = 1;                              // Active, within a valid window
-  public static final int STATUS_PAUSED = 2;                              // Paused, but within a valid window
-  public static final int STATUS_SHUTTINGDOWN = 3;                // Done, except for process cleanup
-  public static final int STATUS_ACTIVEWAIT = 4;                  // Active, but paused due to window expiration
-  public static final int STATUS_PAUSEDWAIT = 5;                  // Paused, and outside of window expiration
-  public static final int STATUS_ABORTING = 6;                            // Aborting (not yet aborted because documents still being processed)
-  public static final int STATUS_STARTINGUP = 7;                  // Loading the queue (will go into ACTIVE if successful, or INACTIVE if not)
-  public static final int STATUS_ABORTINGSTARTINGUP = 8;  // Will abort once the queue loading is complete
+  public static final int STATUS_PAUSED = 2;                             // Paused, but within a valid window
+  public static final int STATUS_SHUTTINGDOWN = 3;                  // Done, except for process cleanup
+  public static final int STATUS_ACTIVEWAIT = 4;                        // Active, but paused due to window expiration
+  public static final int STATUS_PAUSEDWAIT = 5;                        // Paused, and outside of window expiration
+  public static final int STATUS_ABORTING = 6;                          // Aborting (not yet aborted because documents still being processed)
+  public static final int STATUS_STARTINGUP = 7;                        // Loading the queue (will go into ACTIVE if successful, or INACTIVE if not)
+  public static final int STATUS_ABORTINGSTARTINGUP = 8;        // Will abort once the queue loading is complete
   public static final int STATUS_READYFORSTARTUP = 9;             // Job is marked for startup; startup thread has not taken it yet.
   public static final int STATUS_READYFORDELETE = 10;             // Job is marked for delete; delete thread has not taken it yet.
   public static final int STATUS_ACTIVESEEDING = 11;              // Same as active, but seeding process is currently active also.
   public static final int STATUS_ABORTINGSEEDING = 12;            // Same as aborting, but seeding process is currently active also.
   public static final int STATUS_PAUSEDSEEDING = 13;              // Same as paused, but seeding process is currently active also.
-  public static final int STATUS_ACTIVEWAITSEEDING = 14;  // Same as active wait, but seeding process is currently active also.
-  public static final int STATUS_PAUSEDWAITSEEDING = 15;  // Same as paused wait, but seeding process is currently active also.
-  public static final int STATUS_ABORTINGFORRESTART = 16; // Same as aborting, except after abort is complete startup will happen.
+  public static final int STATUS_ACTIVEWAITSEEDING = 14;        // Same as active wait, but seeding process is currently active also.
+  public static final int STATUS_PAUSEDWAITSEEDING = 15;        // Same as paused wait, but seeding process is currently active also.
+  public static final int STATUS_ABORTINGFORRESTART = 16;       // Same as aborting, except after abort is complete startup will happen.
   public static final int STATUS_ABORTINGFORRESTARTSEEDING = 17;  // Seeding version of aborting for restart
   public static final int STATUS_ABORTINGSTARTINGUPFORRESTART = 18; // Starting up version of aborting for restart
-
+  public static final int STATUS_NOTIFYINGOFCOMPLETION = 19;    // Notifying connector of terminating job (either aborted, or finished)
+  
   // These statuses have to do with whether a job has an installed underlying connector or not.
   // There are two reasons to have a special state here: (1) if the behavior of the crawler differs, or (2) if the
   // UI would present something different.
@@ -135,6 +136,7 @@ public class Jobs extends org.apache.acf
     statusMap.put("A",new Integer(STATUS_ACTIVE));
     statusMap.put("P",new Integer(STATUS_PAUSED));
     statusMap.put("S",new Integer(STATUS_SHUTTINGDOWN));
+    statusMap.put("s",new Integer(STATUS_NOTIFYINGOFCOMPLETION));
     statusMap.put("W",new Integer(STATUS_ACTIVEWAIT));
     statusMap.put("Z",new Integer(STATUS_PAUSEDWAIT));
     statusMap.put("X",new Integer(STATUS_ABORTING));
@@ -1635,7 +1637,7 @@ public class Jobs extends org.apache.acf
     ArrayList list = new ArrayList();
     list.add(jobID);
     HashMap map = new HashMap();
-    map.put(statusField,statusToString(STATUS_INACTIVE));
+    map.put(statusField,statusToString(STATUS_NOTIFYINGOFCOMPLETION));
     map.put(errorField,null);
     map.put(endTimeField,new Long(finishTime));
     map.put(lastTimeField,new Long(finishTime));
@@ -1654,7 +1656,7 @@ public class Jobs extends org.apache.acf
     ArrayList list = new ArrayList();
     list.add(jobID);
     HashMap map = new HashMap();
-    map.put(statusField,statusToString(STATUS_INACTIVE));
+    map.put(statusField,statusToString(STATUS_NOTIFYINGOFCOMPLETION));
     map.put(endTimeField,null);
     map.put(lastTimeField,new Long(abortTime));
     map.put(windowEndField,null);
@@ -1663,6 +1665,20 @@ public class Jobs extends org.apache.acf
     performUpdate(map,"WHERE "+idField+"=?",list,new StringSet(getJobStatusKey()));
   }
 
+  /** Mark job as having properly notified the output connector of completion.
+  *@param jobID is the job id.
+  */
+  public void notificationComplete(Long jobID)
+    throws ACFException
+  {
+    ArrayList list = new ArrayList();
+    list.add(jobID);
+    HashMap map = new HashMap();
+    map.put(statusField,statusToString(STATUS_INACTIVE));
+    // Leave everything else around from the abort/finish.
+    performUpdate(map,"WHERE "+idField+"=?",list,new StringSet(getJobStatusKey()));
+  }
+  
   /** See if there's a reference to a connection name.
   *@param connectionName is the name of the connection.
   *@return true if there is a reference, false otherwise.
@@ -1807,6 +1823,8 @@ public class Jobs extends org.apache.acf
       return "P";
     case STATUS_SHUTTINGDOWN:
       return "S";
+    case STATUS_NOTIFYINGOFCOMPLETION:
+      return "s";
     case STATUS_ACTIVEWAIT:
       return "W";
     case STATUS_PAUSEDWAIT:

Modified: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/ACF.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/ACF.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/ACF.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/ACF.java Tue Aug 31 19:12:14 2010
@@ -38,6 +38,7 @@ public class ACF extends org.apache.acf.
   protected static JobStartThread jobStartThread = null;
   protected static StufferThread stufferThread = null;
   protected static FinisherThread finisherThread = null;
+  protected static JobNotificationThread notificationThread = null;
   protected static StartupThread startupThread = null;
   protected static JobDeleteThread jobDeleteThread = null;
   protected static WorkerThread[] workerThreads = null;
@@ -189,6 +190,7 @@ public class ACF extends org.apache.acf.
       jobStartThread = new JobStartThread();
       startupThread = new StartupThread(queueTracker);
       finisherThread = new FinisherThread();
+      notificationThread = new JobNotificationThread();
       jobDeleteThread = new JobDeleteThread();
       stufferThread = new StufferThread(documentQueue,numWorkerThreads,workerResetManager,queueTracker,blockingDocuments,lowWaterFactor,stuffAmtFactor);
       expireStufferThread = new ExpireStufferThread(expireQueue,numExpireThreads,workerResetManager);
@@ -289,6 +291,7 @@ public class ACF extends org.apache.acf.
         jobStartThread.start();
         startupThread.start();
         finisherThread.start();
+        notificationThread.start();
         jobDeleteThread.start();
         stufferThread.start();
         expireStufferThread.start();
@@ -346,7 +349,7 @@ public class ACF extends org.apache.acf.
     synchronized (startupLock)
     {
       while (initializationThread != null || jobDeleteThread != null || startupThread != null || jobStartThread != null || stufferThread != null ||
-        finisherThread != null || workerThreads != null || expireStufferThread != null | expireThreads != null ||
+        finisherThread != null || notificationThread != null || workerThreads != null || expireStufferThread != null | expireThreads != null ||
         deleteStufferThread != null || deleteThreads != null ||
         jobResetThread != null || seedingThread != null || idleCleanupThread != null || setPriorityThread != null)
       {
@@ -385,6 +388,10 @@ public class ACF extends org.apache.acf.
         {
           finisherThread.interrupt();
         }
+        if (notificationThread != null)
+        {
+          notificationThread.interrupt();
+        }
         if (workerThreads != null)
         {
           int i = 0;
@@ -482,6 +489,11 @@ public class ACF extends org.apache.acf.
           if (!finisherThread.isAlive())
             finisherThread = null;
         }
+        if (notificationThread != null)
+        {
+          if (!notificationThread.isAlive())
+            notificationThread = null;
+        }
         if (workerThreads != null)
         {
           int i = 0;
@@ -2514,6 +2526,8 @@ public class ACF extends org.apache.acf.
       return "running no connector";
     case JobStatus.JOBSTATUS_JOBENDCLEANUP:
       return "terminating";
+    case JobStatus.JOBSTATUS_JOBENDNOTIFICATION:
+      return "notifying";
     default:
       return "unknown";
     }

Modified: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/FinisherThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/FinisherThread.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/FinisherThread.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/FinisherThread.java Tue Aug 31 19:12:14 2010
@@ -61,16 +61,7 @@ public class FinisherThread extends Thre
         {
           Logging.threads.debug("Cleaning up completed jobs...");
           // See if there are any completed jobs
-          ArrayList doneJobs = new ArrayList();
-          jobManager.finishJobs(doneJobs);
-          int k = 0;
-          while (k < doneJobs.size())
-          {
-            IJobDescription desc = (IJobDescription)doneJobs.get(k++);
-            connectionManager.recordHistory(desc.getConnectionName(),
-              null,connectionManager.ACTIVITY_JOBEND,null,
-              desc.getID().toString()+"("+desc.getDescription()+")",null,null,null);
-          }
+          jobManager.finishJobs();
           Logging.threads.debug("Done cleaning up completed jobs");
           ACF.sleep(10000L);
         }

Added: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java?rev=991295&view=auto
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java (added)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java Tue Aug 31 19:12:14 2010
@@ -0,0 +1,172 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.acf.crawler.system;
+
+import org.apache.acf.core.interfaces.*;
+import org.apache.acf.agents.interfaces.*;
+import org.apache.acf.crawler.interfaces.*;
+import org.apache.acf.crawler.system.Logging;
+import java.util.*;
+import java.lang.reflect.*;
+
+/** This class represents the thread that notices jobs that have completed their "notify connector" phase, and resets them back to
+* inactive.
+*/
+public class JobNotificationThread extends Thread
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  /** Constructor.
+  */
+  public JobNotificationThread()
+    throws ACFException
+  {
+    super();
+    setName("Job notification thread");
+    setDaemon(true);
+  }
+
+  public void run()
+  {
+    try
+    {
+      // Create a thread context object.
+      IThreadContext threadContext = ThreadContextFactory.make();
+      IJobManager jobManager = JobManagerFactory.make(threadContext);
+      IOutputConnectionManager connectionManager = OutputConnectionManagerFactory.make(threadContext);
+
+      // Loop
+      while (true)
+      {
+        // Do another try/catch around everything in the loop
+        try
+        {
+          Long[] jobsNeedingNotification = jobManager.getJobsReadyForInactivity();
+          
+          int k = 0;
+          while (k < jobsNeedingNotification.length)
+          {
+            Long jobID = jobsNeedingNotification[k++];
+            IJobDescription job = jobManager.load(jobID,true);
+            if (job != null)
+            {
+              // Get the connection name
+              String connectionName = job.getOutputConnectionName();
+              IOutputConnection connection = connectionManager.load(connectionName);
+              if (connection != null)
+              {
+                // Grab an appropriate connection instance
+                IOutputConnector connector = OutputConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
+                if (connector != null)
+                {
+                  try
+                  {
+                    // Do the notification itself
+                    try
+                    {
+                      connector.noteJobComplete();
+                    }
+                    catch (ServiceInterruption e)
+                    {
+                      Logging.threads.warn("Service interruption notifying connection - retrying: "+e.getMessage(),e);
+                      continue;
+                    }
+                    catch (ACFException e)
+                    {
+                      if (e.getErrorCode() == ACFException.INTERRUPTED)
+                        throw e;
+                      if (e.getErrorCode() == ACFException.DATABASE_CONNECTION_ERROR)
+                        throw e;
+                      if (e.getErrorCode() == ACFException.SETUP_ERROR)
+                        throw e;
+                      // Nothing special; report the error and keep going.
+                      Logging.threads.error(e.getMessage(),e);
+                      continue;
+                    }
+                    // When done, put the job into the Inactive state.
+                    jobManager.inactivateJob(jobID);
+                  }
+                  finally
+                  {
+                    OutputConnectorFactory.release(connector);
+                  }
+                }
+              }
+            }
+          }
+          ACF.sleep(10000L);
+        }
+        catch (ACFException e)
+        {
+          if (e.getErrorCode() == ACFException.INTERRUPTED)
+            break;
+
+          if (e.getErrorCode() == ACFException.DATABASE_CONNECTION_ERROR)
+          {
+            Logging.threads.error("Job notification thread aborting and restarting due to database connection reset: "+e.getMessage(),e);
+            try
+            {
+              // Give the database a chance to catch up/wake up
+              ACF.sleep(10000L);
+            }
+            catch (InterruptedException se)
+            {
+              break;
+            }
+            continue;
+          }
+
+          // Log it, but keep the thread alive
+          Logging.threads.error("Exception tossed: "+e.getMessage(),e);
+
+          if (e.getErrorCode() == ACFException.SETUP_ERROR)
+          {
+            // Shut the whole system down!
+            System.exit(1);
+          }
+
+        }
+        catch (InterruptedException e)
+        {
+          // We're supposed to quit
+          break;
+        }
+        catch (OutOfMemoryError e)
+        {
+          System.err.println("agents process ran out of memory - shutting down");
+          e.printStackTrace(System.err);
+          System.exit(-200);
+        }
+        catch (Throwable e)
+        {
+          // A more severe error - but stay alive
+          Logging.threads.fatal("Error tossed: "+e.getMessage(),e);
+        }
+      }
+    }
+    catch (Throwable e)
+    {
+      // Severe error on initialization
+      System.err.println("agents process could not start - shutting down");
+      Logging.threads.fatal("JobNotificationThread initialization error tossed: "+e.getMessage(),e);
+      System.exit(-300);
+    }
+  }
+
+}

Propchange: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobNotificationThread.java
------------------------------------------------------------------------------
    svn:keywords = Id

Modified: incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobResetThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobResetThread.java?rev=991295&r1=991294&r2=991295&view=diff
==============================================================================
--- incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobResetThread.java (original)
+++ incubator/lcf/trunk/modules/framework/pull-agent/src/main/java/org/apache/acf/crawler/system/JobResetThread.java Tue Aug 31 19:12:14 2010
@@ -25,8 +25,8 @@ import org.apache.acf.crawler.system.Log
 import java.util.*;
 import java.lang.reflect.*;
 
-/** This class represents the thread that notices jobs that have completed their shutdown phase, and resets them back to
-* inactive.
+/** This class represents the thread that notices jobs that have completed their shutdown phase, and puts them in the
+* "notify connector" state.
 */
 public class JobResetThread extends Thread
 {
@@ -73,8 +73,17 @@ public class JobResetThread extends Thre
               null,connectionManager.ACTIVITY_JOBABORT,null,
               desc.getID().toString()+"("+desc.getDescription()+")",null,null,null);
           }
-          jobManager.resetJobs(currentTime);
-
+          ArrayList jobCompletions = new ArrayList();
+          jobManager.resetJobs(currentTime,jobCompletions);
+          k = 0;
+          while (k < jobCompletions.size())
+          {
+            IJobDescription desc = (IJobDescription)jobCompletions.get(k++);
+            connectionManager.recordHistory(desc.getConnectionName(),
+              null,connectionManager.ACTIVITY_JOBEND,null,
+              desc.getID().toString()+"("+desc.getDescription()+")",null,null,null);
+          }
+          
           // If there were any job aborts, we must reprioritize all active documents, since we've done something
           // not predicted by the algorithm that assigned those priorities.  This is, of course, quite expensive,
           // but it cannot be helped (at least, I cannot find a way to avoid it).