You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2011/10/07 13:06:44 UTC

svn commit: r1180003 - in /incubator/lcf/branches/CONNECTORS-256/connectors/wiki/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/wiki: PageBuffer.java WikiConnector.java

Author: kwright
Date: Fri Oct  7 11:06:43 2011
New Revision: 1180003

URL: http://svn.apache.org/viewvc?rev=1180003&view=rev
Log:
Flesh out method that lists pages

Modified:
    incubator/lcf/branches/CONNECTORS-256/connectors/wiki/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/wiki/PageBuffer.java
    incubator/lcf/branches/CONNECTORS-256/connectors/wiki/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/wiki/WikiConnector.java

Modified: incubator/lcf/branches/CONNECTORS-256/connectors/wiki/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/wiki/PageBuffer.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-256/connectors/wiki/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/wiki/PageBuffer.java?rev=1180003&r1=1180002&r2=1180003&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-256/connectors/wiki/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/wiki/PageBuffer.java (original)
+++ incubator/lcf/branches/CONNECTORS-256/connectors/wiki/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/wiki/PageBuffer.java Fri Oct  7 11:06:43 2011
@@ -18,7 +18,6 @@
 */
 package org.apache.manifoldcf.crawler.connectors.wiki;
 
-import org.apache.manifoldcf.core.interfaces.*;
 import java.util.*;
 
 /** Thread-safe class that functions as a limited-size buffer of pageIDs */
@@ -37,19 +36,10 @@ public class PageBuffer
   
   /** Add a page id to the buffer, and block if the buffer is full */
   public synchronized void add(String pageID)
-    throws ManifoldCFException
+    throws InterruptedException
   {
-    try
-    {
-      while (buffer.size() == MAX_SIZE)
-      {
-        wait();
-      }
-    }
-    catch (InterruptedException e)
-    {
-      throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
-    }
+    while (buffer.size() == MAX_SIZE)
+      wait();
     buffer.add(pageID);
     // Notify threads that are waiting on there being stuff in the queue
     notifyAll();
@@ -69,17 +59,10 @@ public class PageBuffer
   * Returns null if the operation is complete.
   */
   public synchronized String fetch()
-    throws ManifoldCFException
+    throws InterruptedException
   {
-    try
-    {
-      while (buffer.size() == 0 && !complete)
-        wait();
-    }
-    catch (InterruptedException e)
-    {
-      throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
-    }
+    while (buffer.size() == 0 && !complete)
+      wait();
     if (buffer.size() == 0)
       return null;
     boolean isBufferFull = (buffer.size() == MAX_SIZE);

Modified: incubator/lcf/branches/CONNECTORS-256/connectors/wiki/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/wiki/WikiConnector.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/CONNECTORS-256/connectors/wiki/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/wiki/WikiConnector.java?rev=1180003&r1=1180002&r2=1180003&view=diff
==============================================================================
--- incubator/lcf/branches/CONNECTORS-256/connectors/wiki/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/wiki/WikiConnector.java (original)
+++ incubator/lcf/branches/CONNECTORS-256/connectors/wiki/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/wiki/WikiConnector.java Fri Oct  7 11:06:43 2011
@@ -599,7 +599,15 @@ public class WikiConnector extends org.a
         Throwable thr = t.getException();
         if (thr != null)
         {
-          if (thr instanceof IOException)
+          if (thr instanceof ManifoldCFException)
+          {
+            if (((ManifoldCFException)thr).getErrorCode() == ManifoldCFException.INTERRUPTED)
+              throw new InterruptedException(thr.getMessage());
+            throw (ManifoldCFException)thr;
+          }
+          else if (thr instanceof ServiceInterruption)
+            throw (ServiceInterruption)thr;
+          else if (thr instanceof IOException)
             throw (IOException)thr;
           else if (thr instanceof RuntimeException)
             throw (RuntimeException)thr;
@@ -607,6 +615,21 @@ public class WikiConnector extends org.a
             throw (Error)thr;
         }
       }
+      catch (ManifoldCFException e)
+      {
+        t.interrupt();
+        throw e;
+      }
+      catch (ServiceInterruption e)
+      {
+        t.interrupt();
+        throw e;
+      }
+      catch (IOException e)
+      {
+        t.interrupt();
+        throw e;
+      }
       catch (InterruptedException e)
       {
         t.interrupt();
@@ -943,6 +966,176 @@ public class WikiConnector extends org.a
     }
   }
 
+  /** Execute a listPages() operation via a thread.  Returns the last page title. */
+  protected String executeListPagesViaThread(String startPageTitle, ISeedingActivity activities)
+    throws ManifoldCFException, ServiceInterruption
+  {
+    HttpClient client = getInitializedClient();
+    HttpMethodBase executeMethod = getInitializedMethod(getListPagesURL(startPageTitle));
+    try
+    {
+      PageBuffer pageBuffer = new PageBuffer();
+      ExecuteListPagesThread t = new ExecuteListPagesThread(client,executeMethod,pageBuffer);
+      try
+      {
+        t.start();
+
+        // Pick up the pages, and add them to the activities, before we join with the child thread.
+        while (true)
+        {
+          // The only kind of exceptions this can throw are going to shut the process down.
+          String pageID = pageBuffer.fetch();
+          if (pageID ==  null)
+            break;
+          // Add the pageID to the queue
+          activities.addSeedDocument(pageID);
+        }
+        
+        t.join();
+        Throwable thr = t.getException();
+        if (thr != null)
+        {
+          if (thr instanceof ManifoldCFException)
+          {
+            if (((ManifoldCFException)thr).getErrorCode() == ManifoldCFException.INTERRUPTED)
+              throw new InterruptedException(thr.getMessage());
+            throw (ManifoldCFException)thr;
+          }
+          else if (thr instanceof ServiceInterruption)
+            throw (ServiceInterruption)thr;
+          else if (thr instanceof IOException)
+            throw (IOException)thr;
+          else if (thr instanceof RuntimeException)
+            throw (RuntimeException)thr;
+          else
+            throw (Error)thr;
+        }
+        return t.getLastPageTitle();
+      }
+      catch (ManifoldCFException e)
+      {
+        t.interrupt();
+        throw e;
+      }
+      catch (ServiceInterruption e)
+      {
+        t.interrupt();
+        throw e;
+      }
+      catch (IOException e)
+      {
+        t.interrupt();
+        throw e;
+      }
+      catch (InterruptedException e)
+      {
+        t.interrupt();
+        // We need the caller to abandon any connections left around, so rethrow in a way that forces them to process the event properly.
+        throw e;
+      }
+    }
+    catch (InterruptedException e)
+    {
+      // Drop the connection on the floor
+      executeMethod = null;
+      throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+    }
+    catch (java.net.SocketTimeoutException e)
+    {
+      long currentTime = System.currentTimeMillis();
+      throw new ServiceInterruption("ListPages timed out reading from the Wiki server: "+e.getMessage(),e,currentTime+300000L,currentTime+12L * 60000L,-1,false);
+    }
+    catch (java.net.SocketException e)
+    {
+      long currentTime = System.currentTimeMillis();
+      throw new ServiceInterruption("ListPages received a socket error reading from Wiki server: "+e.getMessage(),e,currentTime+300000L,currentTime+12L * 60000L,-1,false);
+    }
+    catch (org.apache.commons.httpclient.ConnectTimeoutException e)
+    {
+      long currentTime = System.currentTimeMillis();
+      throw new ServiceInterruption("ListPages connection timed out reading from Wiki server: "+e.getMessage(),e,currentTime+300000L,currentTime+12L * 60000L,-1,false);
+    }
+    catch (InterruptedIOException e)
+    {
+      executeMethod = null;
+      throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+    }
+    catch (IOException e)
+    {
+      throw new ManifoldCFException("ListPages had an IO failure: "+e.getMessage(),e);
+    }
+    finally
+    {
+      if (executeMethod != null)
+        executeMethod.releaseConnection();
+    }
+  }
+  
+  /** Thread to execute a list pages operation */
+  protected static class ExecuteListPagesThread extends Thread
+  {
+    protected HttpClient client;
+    protected HttpMethodBase executeMethod;
+    protected Throwable exception = null;
+    protected PageBuffer pageBuffer;
+    protected String lastPageTitle = null;
+
+    public ExecuteListPagesThread(HttpClient client, HttpMethodBase executeMethod, PageBuffer pageBuffer)
+    {
+      super();
+      setDaemon(true);
+      this.client = client;
+      this.executeMethod = executeMethod;
+      this.pageBuffer = pageBuffer;
+    }
+
+    public void run()
+    {
+      try
+      {
+        // Call the execute method appropriately
+        int rval = client.executeMethod(executeMethod);
+        if (rval != 200)
+          throw new ManifoldCFException("Unexpected response code: "+rval);
+        // Read response and make sure it's valid
+        InputStream is = executeMethod.getResponseBodyAsStream();
+        try
+        {
+          lastPageTitle = parseListPagesResponse(is,pageBuffer);
+        }
+        finally
+        {
+          try
+          {
+            is.close();
+          }
+          catch (IllegalStateException e)
+          {
+            // Ignore this error
+          }
+        }
+      }
+      catch (Throwable e)
+      {
+        this.exception = e;
+      }
+      finally
+      {
+        pageBuffer.signalDone();
+      }
+    }
+
+    public Throwable getException()
+    {
+      return exception;
+    }
+
+    public String getLastPageTitle()
+    {
+      return lastPageTitle;
+    }
+  }
+
   /** Parse list output, e.g.:
   * <api xmlns="http://www.mediawiki.org/xml/api/">
   *   <query>
@@ -980,6 +1173,8 @@ public class WikiConnector extends org.a
       }
       catch (ManifoldCFException e)
       {
+        if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+          throw e;
         // Ignore XML parsing errors.
         if (e.getMessage().indexOf("pars") >= 0)
         {
@@ -1120,7 +1315,14 @@ public class WikiConnector extends org.a
         lastTitle = atts.getValue("title");
         String pageID = atts.getValue("pageid");
         // Add the discovered page id to the page buffer
-        buffer.add(pageID);
+        try
+        {
+          buffer.add(pageID);
+        }
+        catch (InterruptedException e)
+        {
+          throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+        }
       }
       return super.beginTag(namespaceURI,localName,qName,atts);
     }