You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/04 23:38:21 UTC

svn commit: r1547943 - in /manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/connectorpool: ./ ConnectorPool.java

Author: kwright
Date: Wed Dec  4 22:38:21 2013
New Revision: 1547943

URL: http://svn.apache.org/r1547943
Log:
Define a generic connector pool that we can include in connector pool implementations

Added:
    manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/connectorpool/
    manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/connectorpool/ConnectorPool.java   (with props)

Added: manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/connectorpool/ConnectorPool.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/connectorpool/ConnectorPool.java?rev=1547943&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/connectorpool/ConnectorPool.java (added)
+++ manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/connectorpool/ConnectorPool.java Wed Dec  4 22:38:21 2013
@@ -0,0 +1,408 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.connectorpool;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+
+import java.util.*;
+import java.io.*;
+import java.lang.reflect.*;
+
+/** This is the base factory class for all ConnectorPool objects.
+*/
+public abstract class ConnectorPool<T extends IConnector>
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  // Pool hash table.
+  // Keyed by PoolKey; value is Pool
+  protected final Map<PoolKey,Pool> poolHash = new HashMap<PoolKey,Pool>();
+
+  protected ConnectorPool()
+  {
+  }
+
+  // Protected methods
+  
+  /** Override this method to instantiate a connector.
+  */
+  protected abstract T createConnectorInstance(IThreadContext tc, String className)
+    throws ManifoldCFException;
+  
+  /** Get multiple connectors, all at once.  Do this in a particular order
+  * so that any connector exhaustion will not cause a deadlock.
+  */
+  public T[] grabMultiple(IThreadContext threadContext, Class<T> clazz,
+    String[] orderingKeys, String[] classNames, ConfigParams[] configInfos, int[] maxPoolSizes)
+    throws ManifoldCFException
+  {
+    T[] rval = (T[])Array.newInstance(clazz,classNames.length);
+    Map<String,Integer> orderMap = new HashMap<String,Integer>();
+    for (int i = 0; i < orderingKeys.length; i++)
+    {
+      if (orderMap.get(orderingKeys[i]) != null)
+        throw new ManifoldCFException("Found duplicate order key");
+      orderMap.put(orderingKeys[i],new Integer(i));
+    }
+    java.util.Arrays.sort(orderingKeys);
+    for (int i = 0; i < orderingKeys.length; i++)
+    {
+      String orderingKey = orderingKeys[i];
+      int index = orderMap.get(orderingKey).intValue();
+      String className = classNames[index];
+      ConfigParams cp = configInfos[index];
+      int maxPoolSize = maxPoolSizes[index];
+      try
+      {
+        T connector = grab(threadContext,className,cp,maxPoolSize);
+        rval[index] = connector;
+      }
+      catch (Throwable e)
+      {
+        while (i > 0)
+        {
+          i--;
+          orderingKey = orderingKeys[i];
+          index = orderMap.get(orderingKey).intValue();
+          try
+          {
+            release(rval[index]);
+          }
+          catch (ManifoldCFException e2)
+          {
+          }
+        }
+        if (e instanceof ManifoldCFException)
+          throw (ManifoldCFException)e;
+        else if (e instanceof RuntimeException)
+          throw (RuntimeException)e;
+        else if (e instanceof Error)
+          throw (Error)e;
+        else
+          throw new RuntimeException("Unexpected exception type: "+e.getClass().getName()+": "+e.getMessage(),e);
+      }
+    }
+    return rval;
+  }
+
+  /** Get a connector.
+  * The connector is specified by its class and its parameters.
+  *@param threadContext is the current thread context.
+  *@param className is the name of the class to get a connector for.
+  *@param configInfo are the name/value pairs constituting configuration info
+  * for this class.
+  */
+  public T grab(IThreadContext threadContext,
+    String className, ConfigParams configInfo, int maxPoolSize)
+    throws ManifoldCFException
+  {
+    // We want to get handles off the pool and use them.  But the
+    // handles we fetch have to have the right config information.
+
+    // Use the classname and config info to build a pool key.  This
+    // key will be discarded if we actually have to save a key persistently,
+    // since we avoid copying the configInfo unnecessarily.
+    PoolKey pk = new PoolKey(className,configInfo);
+    Pool p;
+    synchronized (poolHash)
+    {
+      p = poolHash.get(pk);
+      if (p == null)
+      {
+        pk = new PoolKey(className,configInfo.duplicate());
+        p = new Pool(pk,maxPoolSize);
+        poolHash.put(pk,p);
+      }
+    }
+
+    T rval = p.getConnector(threadContext);
+
+    return rval;
+
+  }
+
+  /** Release multiple output connectors.
+  */
+  public void releaseMultiple(T[] connectors)
+    throws ManifoldCFException
+  {
+    ManifoldCFException currentException = null;
+    for (int i = 0; i < connectors.length; i++)
+    {
+      T c = connectors[i];
+      try
+      {
+        release(c);
+      }
+      catch (ManifoldCFException e)
+      {
+        if (currentException == null)
+          currentException = e;
+      }
+    }
+    if (currentException != null)
+      throw currentException;
+  }
+
+  /** Release an output connector.
+  *@param connector is the connector to release.
+  */
+  public void release(T connector)
+    throws ManifoldCFException
+  {
+    // If the connector is null, skip the release, because we never really got the connector in the first place.
+    if (connector == null)
+      return;
+
+    // Figure out which pool this goes on, and put it there
+    PoolKey pk = new PoolKey(connector.getClass().getName(),connector.getConfiguration());
+    Pool p;
+    synchronized (poolHash)
+    {
+      p = poolHash.get(pk);
+    }
+
+    p.releaseConnector(connector);
+  }
+
+  /** Idle notification for inactive output connector handles.
+  * This method polls all inactive handles.
+  */
+  protected void pollAllConnectors(IThreadContext threadContext)
+    throws ManifoldCFException
+  {
+    // System.out.println("Pool stats:");
+
+    // Go through the whole pool and notify everyone
+    synchronized (poolHash)
+    {
+      Iterator<Pool> iter = poolHash.values().iterator();
+      while (iter.hasNext())
+      {
+        Pool p = iter.next();
+        p.pollAll(threadContext);
+      }
+    }
+
+  }
+
+  /** Flush only those connector handles that are currently unused.
+  */
+  protected void flushUnusedConnectors(IThreadContext threadContext)
+    throws ManifoldCFException
+  {
+    closeAllConnectors(threadContext);
+  }
+
+  /** Clean up all open output connector handles.
+  * This method is called when the connector pool needs to be flushed,
+  * to free resources.
+  *@param threadContext is the local thread context.
+  */
+  protected void closeAllConnectors(IThreadContext threadContext)
+    throws ManifoldCFException
+  {
+    // Go through the whole pool and clean it out
+    synchronized (poolHash)
+    {
+      Iterator<Pool> iter = poolHash.values().iterator();
+      while (iter.hasNext())
+      {
+        Pool p = iter.next();
+        p.releaseAll(threadContext);
+      }
+    }
+  }
+
+  /** This is an immutable pool key class, which describes a pool in terms of two independent keys.
+  */
+  public static class PoolKey
+  {
+    protected final String className;
+    protected final ConfigParams configInfo;
+
+    /** Constructor.
+    */
+    public PoolKey(String className, Map configInfo)
+    {
+      this.className = className;
+      this.configInfo = new ConfigParams(configInfo);
+    }
+
+    public PoolKey(String className, ConfigParams configInfo)
+    {
+      this.className = className;
+      this.configInfo = configInfo;
+    }
+
+    /** Get the class name.
+    *@return the class name.
+    */
+    public String getClassName()
+    {
+      return className;
+    }
+
+    /** Get the config info.
+    *@return the params
+    */
+    public ConfigParams getParams()
+    {
+      return configInfo;
+    }
+
+    /** Hash code.
+    */
+    public int hashCode()
+    {
+      return className.hashCode() + configInfo.hashCode();
+    }
+
+    /** Equals operator.
+    */
+    public boolean equals(Object o)
+    {
+      if (!(o instanceof PoolKey))
+        return false;
+
+      PoolKey pk = (PoolKey)o;
+      return pk.className.equals(className) && pk.configInfo.equals(configInfo);
+    }
+
+  }
+
+  /** This class represents a value in the pool hash, which corresponds to a given key.
+  */
+  public class Pool
+  {
+    protected final List<T> stack = new ArrayList<T>();
+    protected final PoolKey key;
+    protected int numFree;
+
+    /** Constructor
+    */
+    public Pool(PoolKey pk, int maxCount)
+    {
+      key = pk;
+      numFree = maxCount;
+    }
+
+    /** Grab a connector.
+    * If none exists, construct it using the information in the pool key.
+    *@return the connector, or null if no connector could be connected.
+    */
+    public synchronized T getConnector(IThreadContext threadContext)
+      throws ManifoldCFException
+    {
+      while (numFree == 0)
+      {
+        try
+        {
+          wait();
+        }
+        catch (InterruptedException e)
+        {
+          throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+        }
+      }
+
+      if (stack.size() == 0)
+      {
+        String className = key.getClassName();
+        ConfigParams configParams = key.getParams();
+
+        T newrc = createConnectorInstance(threadContext,className);
+        newrc.connect(configParams);
+        stack.add(newrc);
+      }
+      
+      // Since thread context set can fail, do that before we remove it from the pool.
+      T rc = stack.get(stack.size()-1);
+      rc.setThreadContext(threadContext);
+      stack.remove(stack.size()-1);
+      numFree--;
+      
+      return rc;
+    }
+
+    /** Release a connector to the pool.
+    *@param connector is the connector.
+    */
+    public synchronized void releaseConnector(T connector)
+      throws ManifoldCFException
+    {
+      if (connector == null)
+        return;
+
+      // Make sure connector knows it's released
+      connector.clearThreadContext();
+      // Append
+      stack.add(connector);
+      numFree++;
+      notifyAll();
+    }
+
+    /** Notify all free connectors.
+    */
+    public synchronized void pollAll(IThreadContext threadContext)
+      throws ManifoldCFException
+    {
+      int i = 0;
+      while (i < stack.size())
+      {
+        T rc = stack.get(i++);
+        // Notify
+        rc.setThreadContext(threadContext);
+        try
+        {
+          rc.poll();
+        }
+        finally
+        {
+          rc.clearThreadContext();
+        }
+      }
+    }
+
+    /** Release all free connectors.
+    */
+    public synchronized void releaseAll(IThreadContext threadContext)
+      throws ManifoldCFException
+    {
+      while (stack.size() > 0)
+      {
+        // Disconnect
+        T rc = stack.get(stack.size()-1);
+        rc.setThreadContext(threadContext);
+        try
+        {
+          rc.disconnect();
+          stack.remove(stack.size()-1);
+        }
+        finally
+        {
+          rc.clearThreadContext();
+        }
+      }
+    }
+
+  }
+
+}

Propchange: manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/connectorpool/ConnectorPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: manifoldcf/branches/CONNECTORS-781/framework/core/src/main/java/org/apache/manifoldcf/core/connectorpool/ConnectorPool.java
------------------------------------------------------------------------------
    svn:keywords = Id