You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/17 14:54:53 UTC

svn commit: r1551545 - in /manifoldcf/branches/CONNECTORS-829/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler: IThrottledConnection.java ThrottledFetcher.java

Author: kwright
Date: Tue Dec 17 13:54:53 2013
New Revision: 1551545

URL: http://svn.apache.org/r1551545
Log:
Finish new connection pool code.

Modified:
    manifoldcf/branches/CONNECTORS-829/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java
    manifoldcf/branches/CONNECTORS-829/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java

Modified: manifoldcf/branches/CONNECTORS-829/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java?rev=1551545&r1=1551544&r2=1551545&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java (original)
+++ manifoldcf/branches/CONNECTORS-829/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java Tue Dec 17 13:54:53 2013
@@ -39,6 +39,11 @@ public interface IThrottledConnection
   public static final int FETCH_INTERRUPTED = -104;
   public static final int FETCH_UNKNOWN_ERROR = -999;
 
+  /** Check whether the connection has expired.
+  *@return true if the connection has expired, and should be closed.
+  */
+  public boolean hasExpired();
+
   /** Begin the fetch process.
   * @param fetchType is a short descriptive string describing the kind of fetch being requested.  This
   *        is used solely for logging purposes.
@@ -115,6 +120,5 @@ public interface IThrottledConnection
 
   /** Close the connection.  Call this to end this server connection.
   */
-  public void close()
-    throws ManifoldCFException;
+  public void close();
 }

Modified: manifoldcf/branches/CONNECTORS-829/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java?rev=1551545&r1=1551544&r2=1551545&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java (original)
+++ manifoldcf/branches/CONNECTORS-829/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java Tue Dec 17 13:54:53 2013
@@ -1030,6 +1030,17 @@ public class ThrottledFetcher
 
     }
 
+    /** Check whether the connection has expired.
+    *@return true if the connection has expired, and should be closed.
+    */
+    @Override
+    public boolean hasExpired()
+    {
+      // Right now, they never expire
+      // MHL
+      return false;
+    }
+
     public void mustHaveReference(ConnectionBin cb)
     {
       int i = 0;
@@ -1944,7 +1955,6 @@ public class ThrottledFetcher
     */
     @Override
     public void close()
-      throws ManifoldCFException
     {
       synchronized (poolLock)
       {
@@ -2736,4 +2746,235 @@ public class ThrottledFetcher
 
   }
 
+  /** Connection pool key */
+  protected static class ConnectionPoolKey
+  {
+    protected final String protocol;
+    protected final String server;
+    protected final int port;
+    protected final PageCredentials authentication;
+    protected final String trustStoreString;
+    protected final String proxyHost;
+    protected final int proxyPort;
+    protected final String proxyAuthDomain;
+    protected final String proxyAuthUsername;
+    protected final String proxyAuthPassword;
+    
+    public ConnectionPoolKey(String protocol,
+      String server, int port, PageCredentials authentication,
+      String trustStoreString, String proxyHost, int proxyPort,
+      String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
+    {
+      this.protocol = protocol;
+      this.server = server;
+      this.port = port;
+      this.authentication = authentication;
+      this.trustStoreString = trustStoreString;
+      this.proxyHost = proxyHost;
+      this.proxyPort = proxyPort;
+      this.proxyAuthDomain = proxyAuthDomain;
+      this.proxyAuthUsername = proxyAuthUsername;
+      this.proxyAuthPassword = proxyAuthPassword;
+    }
+    
+    public int hashCode()
+    {
+      return protocol.hashCode() +
+        server.hashCode() +
+        (port * 31) +
+        authentication.hashCode() +
+        trustStoreString.hashCode() +
+        ((proxyHost==null)?0:proxyHost.hashCode()) +
+        (proxyPort * 29) +
+        ((proxyAuthDomain==null)?0:proxyAuthDomain.hashCode()) +
+        ((proxyAuthUsername==null)?0:proxyAuthUsername.hashCode()) +
+        ((proxyAuthPassword==null)?0:proxyAuthPassword.hashCode());
+    }
+    
+    public boolean equals(Object o)
+    {
+      if (!(o instanceof ConnectionPoolKey))
+        return false;
+      ConnectionPoolKey other = (ConnectionPoolKey)o;
+      if (!server.equals(other.server) ||
+        port != other.port ||
+        !authentication.equals(other.authentication) ||
+        !trustStoreString.equals(other.trustStoreString))
+        return false;
+      if (proxyHost == null || other.proxyHost == null)
+      {
+        if (proxyHost != other.proxyHost)
+          return false;
+      }
+      else
+      {
+        if (!proxyHost.equals(other.proxyHost))
+          return false;
+      }
+      if (proxyPort != other.proxyPort)
+        return false;
+      if (proxyAuthDomain == null || other.proxyAuthDomain == null)
+      {
+        if (proxyAuthDomain != other.proxyAuthDomain)
+          return false;
+      }
+      else
+      {
+        if (!proxyAuthDomain.equals(other.proxyAuthDomain))
+          return false;
+      }
+      if (proxyAuthUsername == null || other.proxyAuthUsername == null)
+      {
+        if (proxyAuthUsername != other.proxyAuthUsername)
+          return false;
+      }
+      else
+      {
+        if (!proxyAuthUsername.equals(other.proxyAuthUsername))
+          return false;
+      }
+      if (proxyAuthPassword == null || other.proxyAuthPassword == null)
+      {
+        if (proxyAuthPassword != other.proxyAuthPassword)
+          return false;
+      }
+      else
+      {
+        if (!proxyAuthPassword.equals(other.proxyAuthPassword))
+          return false;
+      }
+      return true;
+    }
+  }
+  
+  /** Each connection pool has identical connections we can draw on.
+  */
+  protected static class ConnectionPool
+  {
+    /** Throttler */
+    protected final IConnectionThrottler connectionThrottler;
+    
+    // If we need to create a connection, these are what we use
+    
+    protected final String protocol;
+    protected final String server;
+    protected final int port;
+    protected final PageCredentials authentication;
+    protected final javax.net.ssl.SSLSocketFactory baseFactory;
+    protected final String trustStoreString;
+    protected final ConnectionBin[] bins;
+    protected final String proxyHost;
+    protected final int proxyPort;
+    protected final String proxyAuthDomain;
+    protected final String proxyAuthUsername;
+    protected final String proxyAuthPassword;
+
+    /** The actual pool of connections */
+    protected final List<IThrottledConnection> connections = new ArrayList<IThrottledConnection>();
+    
+    public ConnectionPool(IConnectionThrottler connectionThrottler,
+      String protocol,
+      String server, int port, PageCredentials authentication,
+      javax.net.ssl.SSLSocketFactory baseFactory, String trustStoreString, ConnectionBin[] bins,
+      String proxyHost, int proxyPort,
+      String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
+    {
+      this.connectionThrottler = connectionThrottler;
+      
+      this.protocol = protocol;
+      this.server = server;
+      this.port = port;
+      this.authentication = authentication;
+      this.baseFactory = baseFactory;
+      this.trustStoreString = trustStoreString;
+      this.bins = bins;
+      this.proxyHost = proxyHost;
+      this.proxyPort = proxyPort;
+      this.proxyAuthDomain = proxyAuthDomain;
+      this.proxyAuthUsername = proxyAuthUsername;
+      this.proxyAuthPassword = proxyAuthPassword;
+    }
+    
+    public IThrottledConnection grab()
+      throws InterruptedException
+    {
+      // Wait for a connection
+      int result = connectionThrottler.waitConnectionAvailable();
+      if (result == IConnectionThrottler.CONNECTION_FROM_POOL)
+      {
+        // We are guaranteed to have a connection in the pool, unless there's a coding error.
+        synchronized (connections)
+        {
+          return connections.remove(connections.size()-1);
+        }
+      }
+      else if (result == IConnectionThrottler.CONNECTION_FROM_CREATION)
+      {
+        return new ThrottledConnection(protocol,server,port,authentication,baseFactory,trustStoreString,bins,
+          proxyHost,proxyPort,
+          proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
+      }
+      else
+        throw new IllegalStateException("Unexpected return value from waitConnectionAvailable(): "+result);
+    }
+    
+    public void release(IThrottledConnection connection)
+    {
+      if (connectionThrottler.noteReturnedConnection())
+      {
+        // Destroy this connection
+        connection.close();
+        connectionThrottler.noteConnectionDestroyed();
+      }
+      else
+      {
+        // Return to pool
+        synchronized (connections)
+        {
+          connections.add(connection);
+        }
+        connectionThrottler.noteConnectionReturnedToPool();
+      }
+    }
+    
+    public void flushIdleConnections()
+    {
+      // First, remove connections that are over the quota
+      while (connectionThrottler.checkDestroyPooledConnection())
+      {
+        // Destroy the oldest ones first
+        IThrottledConnection connection;
+        synchronized (connections)
+        {
+          connection = connections.remove(0);
+        }
+        connection.close();
+        connectionThrottler.noteConnectionDestroyed();
+      }
+      // Now, get rid of expired connections
+      while (true)
+      {
+        boolean expired;
+        synchronized (connections)
+        {
+          expired = connections.size() > 0 && connections.get(0).hasExpired();
+        }
+        if (!expired)
+          break;
+        // We found an expired connection!  Now tell the throttler that, and see if it agrees.
+        if (connectionThrottler.checkExpireConnection())
+        {
+          // Remove the connection from the pool, and destroy it.
+          IThrottledConnection connection;
+          synchronized (connections)
+          {
+            connection = connections.remove(0);
+          }
+          connection.close();
+          connectionThrottler.noteConnectionDestroyed();
+        }
+      }
+    }
+    
+  }
 }